Conversation
- NatsStreamConsumer.GetMessages() now lazily retries initialization when _consumer is null, making the consumer self-healing after transient JetStream failures (leader election, timeout, network blip). Log level changed from Error to Warning for transient retry attempts. - Added NumReplicas property to NatsOptions (default 1, backward compatible). NatsConnectionManager now passes NumReplicas to StreamConfig. - NatsConnectionManager handles NATS error code 10058 (stream exists with different config) by attempting an in-place UpdateStreamAsync, enabling replica count upgrades without manual stream deletion. - Added NumReplicas validation (>= 1) to NatsStreamOptionsValidator.
- Unit tests for NatsStreamOptionsValidator: invalid NumReplicas (0, -1, -100) throws OrleansConfigurationException; valid values (1, 3, 5) pass. - Unit tests for existing StreamName validation (null, whitespace). - Default value assertion: NumReplicas defaults to 1. - Integration test: verifies NumReplicas=1 is applied to JetStream StreamConfig. - R3 testing noted as requiring a multi-node NATS cluster (CI-level concern).
|
@dotnet-policy-service agree company="Microsoft" |
There was a problem hiding this comment.
Pull request overview
This PR improves resiliency of the Orleans NATS JetStream streaming provider in multi-node NATS clusters by (1) enabling consumer self-healing after transient JetStream errors and (2) allowing stream replica count configuration so rolling restarts don’t permanently break consumption.
Changes:
- Add lazy re-initialization of
NatsStreamConsumerwhen_consumeris null during polling. - Introduce
NatsOptions.NumReplicaswith validation and apply it when creating JetStream streams (plus attempt stream update on config mismatch). - Add tests covering
NumReplicasdefaults/validation and a JetStream config assertion.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| test/Extensions/Orleans.Streaming.NATS.Tests/NatsOptionsTests.cs | Adds unit tests for options validation and a JetStream-facing test related to replicas. |
| src/Orleans.Streaming.NATS/Providers/NatsStreamConsumer.cs | Adds retry-on-poll initialization logic and lowers severity of “not initialized” logging. |
| src/Orleans.Streaming.NATS/Providers/NatsConnectionManager.cs | Plumbs NumReplicas into stream creation and attempts UpdateStreamAsync on config mismatch. |
| src/Orleans.Streaming.NATS/NatsOptions.cs | Adds NumReplicas option and validates it is at least 1. |
| this._logger.LogWarning( | ||
| "NATS Consumer not initialized — attempting re-initialization. Provider: {Provider} | Stream: {Stream} | Partition: {Partition}.", | ||
| provider, stream, partition); | ||
|
|
||
| await Initialize(cancellationToken); |
There was a problem hiding this comment.
GetMessages logs a warning on every poll cycle while the consumer is uninitialized. During a longer outage/misconfiguration this can still produce very high log volume (poll cadence is ~100ms). Consider adding a backoff/rate-limit (eg, log once then periodically, or only log after N consecutive failures), while still retrying initialization each cycle.
| /// <summary> | ||
| /// The number of stream replicas in the NATS JetStream cluster. | ||
| /// Higher values improve availability during node restarts (R3 survives | ||
| /// single-node failures in a 3-node cluster). Must be an odd number | ||
| /// and cannot exceed the number of NATS nodes. | ||
| /// Defaults to 1. Set to 3 for production clusters with ≥ 3 nodes. | ||
| /// </summary> | ||
| public int NumReplicas { get; set; } = 1; |
There was a problem hiding this comment.
The XML docs state NumReplicas "must be an odd number" and "cannot exceed the number of NATS nodes", but the validator only enforces >= 1. Either relax the docs to match what is actually enforced, or add validation for the odd-number requirement (and consider how to handle/enforce the upper bound, if possible).
| var streamConfig = new StreamConfig(this._options.StreamName, [$"{this._providerName}.>"]) | ||
| { | ||
| Retention = StreamConfigRetention.Workqueue, | ||
| NumReplicas = this._options.NumReplicas, | ||
| SubjectTransform = new SubjectTransform |
There was a problem hiding this comment.
streamConfig is constructed twice (CreateStreamAsync + UpdateStreamAsync) with the same fields. Consider extracting a private helper/local function to build the config once to avoid future drift (eg, if Retention/SubjectTransform changes later but only one path is updated).
| var streamConfig = new StreamConfig(streamName, [$"test-replicas-provider.>"]) | ||
| { | ||
| Retention = StreamConfigRetention.Workqueue, | ||
| NumReplicas = 1 | ||
| }; |
There was a problem hiding this comment.
NumReplicas_IsAppliedToJetStreamConfig doesn't exercise the provider path which was changed (NatsOptions -> NatsConnectionManager -> StreamConfig). The test currently hard-codes NumReplicas = 1 directly on StreamConfig, so it will pass even if _options.NumReplicas is never applied by the provider. Consider updating this to initialize a NatsConnectionManager with options.NumReplicas and asserting the resulting stream info, or rename the test so it reflects what is actually being validated.
NATS streaming consumer permanent failure after transient JetStream errors
Problem
When running Orleans against a multi-node NATS JetStream cluster, a rolling restart of NATS nodes (routine maintenance, crash recovery, scaling) causes permanent consumer failure with no recovery path short of restarting the Orleans silo.
Two issues combine to produce this:
1. No retry on consumer initialization
NatsStreamConsumer.Initialize()is called exactly once during startup. If it fails due to a transient error — timeout during JetStream leader election, network blip, temporary unavailability — the internal_consumerfield staysnullpermanently. Every subsequentGetMessages()poll (~100ms) logs atErrorlevel:…and returns empty, indefinitely, with no self-healing path.
2. Hardcoded R1 streams
NatsConnectionManager.Initialize()creates the JetStream stream without settingNumReplicas, defaulting to R1 (single replica). R1 streams have exactly one leader; any node restart makes the stream temporarily unavailable during leader election — which is the trigger for bug #1.Combined effect: After a NATS rolling update, Orleans consumers enter a permanent error loop on every poll cycle, producing a flood of error logs and zero message delivery until the entire Orleans pod is restarted.
Root Cause
Changes
NatsStreamConsumer.cs_consumeris null inGetMessages(), attempt lazy re-initialization before returning empty. Piggybacks on the existing Orleans pulling agent poll cadence. Log level changed fromErrortoWarning— transient retries during rolling updates are expected, not permanent failures.NatsOptions.csNumReplicasproperty (default1, backward compatible). Added validation inNatsStreamOptionsValidatorensuringNumReplicas >= 1.NatsConnectionManager.csNumReplicastoStreamConfigwhen creating JetStream streams. Handles NATS error code 10058 (stream exists with different config) by attempting an in-placeUpdateStreamAsync, enabling replica count upgrades without manual stream deletion.NatsOptionsTests.cs(new)NumReplicas, missing/emptyStreamName), default value assertion, and an integration test verifyingNumReplicasflows through to JetStream stream config.Usage
Backward Compatibility
NumReplicasdefaults to1— existing deployments are unaffected.GetMessages()is purely additive — previously broken consumers now self-heal.Testing
NumReplicasis applied to JetStream stream config.NatsAdapterTests.SendAndReceiveFromNatsfailures are unrelated (cache cursor requestsSeqNum=0but JetStream sequences start at 1).Microsoft Reviewers: Open in CodeFlow